The First Diffie-Helman authenticates Alice to Bob
The Second Diffie-Helman authenticates Bob to Alice
The Third Diffie-Helman makes a shared connection secret with forward secrecy
The Fourth Diffie-Helman prevents replay attacks as long as the key is sent to one person and deleted from the server.
fromcryptography.hazmat.primitives.asymmetric.x448importX448PrivateKey,X448PublicKeyfromcryptography.hazmat.primitives.asymmetric.ed448importEd448PrivateKey,Ed448PublicKeyfromcryptography.hazmat.primitivesimportserializationfromcryptography.hazmat.primitivesimporthashesfromcryptography.hazmat.primitives.kdf.pbkdf2importPBKDF2HMACfromcryptography.hazmat.primitives.ciphers.aeadimportAESGCMimportjson,os,base64classSignal_Server(object):"""docstring for Signal_Server"""def__init__(self):self.idenity_database={}self.idenity_database_sign={}self.preKey_database={}self.onetime_database={}self.info=b"Signal_Server"defregisterUser(self,username,idenity_key,idenity_key_sign):self.idenity_database[username]=X448PublicKey.from_public_bytes(idenity_key)self.idenity_database_sign[username]=Ed448PublicKey.from_public_bytes(idenity_key_sign)defgetUsersIdenityKey(self,username):returnself.idenity_database[username].public_bytes_raw()defgetUsersIdenitySigningKey(self,username):returnself.idenity_database_sign[username].public_bytes_raw()defgetUsersPreKey(self,username):returnself.preKey_database[username]defgetUserOneTimeKey(self,username):#Return then remove key
returnself.onetime_database[username].pop(0)defupdateUserPreKey(self,username,preKey_bytes,signature):idenity_key=self.idenity_database_sign[username]ifidenity_key.verify(signature,preKey_bytes):print("[-] Signature does't match the Prekey. Either Bad Signature, Wrong Key, wrong username")returnFalseelse:print("[+] Signature Accepted")self.preKey_database[username]={"key":preKey_bytes,"signature":signature}returnTruedefaddOneTimePreKey(self,username,keys_bytes):#Check from the correct User???
ifusernamenotinself.onetime_database:self.onetime_database[username]=[]self.onetime_database[username].append(keys_bytes)classSignal_User(object):"""docstring for Signal_User"""def__init__(self,username):self.username=usernameself.idenity_private_key=Noneself.idenity_private_key_sign=Noneself.private_pre_key=Noneself.one_time_private_keys=[]self._generateKeys()def_generateKeys(self):self.idenity_private_key_sign=Ed448PrivateKey.generate()#The Ed448 Private Key is 57 bits long while the x448 is 56 bits long
#https://crypto.stackexchange.com/questions/99974/curve448-can-ed448-key-material-be-reused-for-x448# So we generate the x448 from the Ed448 key
digest=hashes.Hash(hashes.SHAKE256(57))digest.update(self.idenity_private_key_sign.private_bytes_raw())x488_key=digest.finalize()#print(x488_key)
self.idenity_private_key=Ed448PrivateKey.from_private_bytes(x488_key)self.idenity_private_key=X448PrivateKey.generate()self.private_pre_key=X448PrivateKey.generate()#Lets Generate 5 Prekeys
forxinrange(5):self.one_time_private_keys.append(X448PrivateKey.generate())defregisterUser(self,signal_server):signal_server.registerUser(self.username,self.idenity_private_key.public_key().public_bytes_raw(),self.idenity_private_key_sign.public_key().public_bytes_raw())defgenerateEphemeralKey(self):self.ephemeral_key=X448PrivateKey.generate()defsend_PreKeyToSignalServer(self,signal_server):#Get Public Key
public_Pre_Key=self.private_pre_key.public_key()signature=self.idenity_private_key_sign.sign(public_Pre_Key.public_bytes_raw())#Send to Server
signal_server.updateUserPreKey(self.username,public_Pre_Key.public_bytes_raw(),signature)defgenerateDH1_sender(self,signal_server,username):#Get Destination User's Signed Public Key
dst_idenity_sign_key=Ed448PublicKey.from_public_bytes(signal_server.getUsersIdenitySigningKey(username))dst_prekey_and_signature=signal_server.getUsersPreKey(username)#Check that the signature is correct for that key
ifdst_idenity_sign_key.verify(dst_prekey_and_signature["signature"],dst_prekey_and_signature["key"]):print("[-] Signature does't match the Prekey. Either Bad Signature, Wrong Key, wrong username")returnFalse#Now that we have checked that the server did not give us the wrong user key Lets continue
dst_prekey=X448PublicKey.from_public_bytes(dst_prekey_and_signature["key"])returnself.idenity_private_key.exchange(dst_prekey)defgenerateDH2_sender(self,signal_server,username):#Get Destination User's Signed Public Key
dst_idenity_key=X448PublicKey.from_public_bytes(signal_server.getUsersIdenityKey(username))#Generate Ephemeral Key for the rest of the Key generation
self.generateEphemeralKey()returnself.ephemeral_key.exchange(dst_idenity_key)defgenerateDH3_sender(self,signal_server,username):#Get Destination User's Signed Public Key
dst_idenity_sign_key=Ed448PublicKey.from_public_bytes(signal_server.getUsersIdenitySigningKey(username))dst_prekey_and_signature=signal_server.getUsersPreKey(username)#Check that the signature is correct for that key
ifdst_idenity_sign_key.verify(dst_prekey_and_signature["signature"],dst_prekey_and_signature["key"]):print("[-] Signature does't match the Prekey. Either Bad Signature, Wrong Key, wrong username")returnFalse#Now that we have checked that the server did not give us the wrong user key Lets continue
dst_prekey=X448PublicKey.from_public_bytes(dst_prekey_and_signature["key"])returnself.ephemeral_key.exchange(dst_prekey)defgenerateDH4_sender(self,signal_server,username):#Get Destination User's Signed Public Key
dst_idenity_key=X448PublicKey.from_public_bytes(signal_server.getUsersIdenityKey(username))dst_one_time_key=X448PublicKey.from_public_bytes(signal_server.getUserOneTimeKey(username))returnself.ephemeral_key.exchange(dst_one_time_key)defsend_OneTimeKeysToSignalServer(self,signal_server):forkeyinself.one_time_private_keys:key_bytes=key.public_key().public_bytes_raw()signal_server.addOneTimePreKey(self.username,key_bytes)defgenerateSecretKey(self,dh1,dh2,dh3,dh4):#I Dont know where this is used???
ifself.idenity_private_keyisX448PrivateKey:pre_hash_bytes=57*b"\xFF"hash_obj=PBKDF2HMAC(algorithm=hashes.SHA512(),length=64,salt=(64*b"\x00"),iterations=480000)else:pre_hash_bytes=32*b"\xFF"hash_obj=PBKDF2HMAC(algorithm=hashes.SHA256(),length=32,salt=(32*b"\x00"),iterations=480000)secret_key=hash_obj.derive(pre_hash_bytes+dh1+dh2+dh3+dh4)returnsecret_keydefAADAndEncryptMessage(self,secret_key,message,signal_server,username):dst_idenity_key=signal_server.getUsersIdenityKey(username)# Dest Ident Key, Source Ident Key, dest username, source username, server info
aad={"src_idenity_key":base64.b64encode(self.idenity_private_key.public_key().public_bytes_raw()).decode('ascii'),"src_username":self.username,"src_ephemeral_key":base64.b64encode(self.ephemeral_key.public_key().public_bytes_raw()).decode('ascii'),"dst_idenity_key":base64.b64encode(dst_idenity_key).decode('ascii'),"dst_username":username,"server_info":signal_server.info.decode('ascii')}print(aad)aad_bytes=json.dumps(aad).encode('ascii')aesgcm=AESGCM(secret_key)nonce=os.urandom(12)cyphertext=aesgcm.encrypt(nonce,message,aad_bytes)return{"aad":aad_bytes,"nonce":nonce,"cyphertext":cyphertext}defreceiveMessage(self,message):aad, nonce, cyphertext=[json.loads(message["aad"]),message["nonce"],message["cyphertext"]]src_username, src_idenity_key_bytes, src_ephemeral_key_bytes=[aad["src_username"],base64.b64decode(aad["src_idenity_key"]),base64.b64decode(aad["src_ephemeral_key"])]dst_idenity_key_bytes, dst_username, server_info=[base64.b64decode(aad["dst_idenity_key"]),aad["dst_username"],aad["server_info"]]src_idenity_key=X448PublicKey.from_public_bytes(src_idenity_key_bytes)src_ephemeral_key=X448PublicKey.from_public_bytes(src_ephemeral_key_bytes)#Generate DH1
# DH1 = Src Idenity Key, Dst Signed Pre_key
dh1=self.private_pre_key.exchange(src_idenity_key)#Generate DH2
#DH2 = Src Ephemeral Key, Dst Idenity Key
dh2=self.idenity_private_key.exchange(src_ephemeral_key)#Generate DH3
#DH3 = Src Ephemeral Key, Dst Signed Pre_key
dh3=self.private_pre_key.exchange(src_ephemeral_key)#Generate DH4
#DH4 = Src Ephemeral Key, Dst One Time Key
message_one_time_priv_key=self.one_time_private_keys.pop(0)dh4=message_one_time_priv_key.exchange(src_ephemeral_key)#Generate Secret Key
secret_key=self.generateSecretKey(dh1,dh2,dh3,dh4)#Decrypt and Verify Cypher text
aesgcm=AESGCM(secret_key)returnaesgcm.decrypt(nonce,cyphertext,message["aad"])if__name__=='__main__':#Generate Server
signal_server=Signal_Server()#Generate Users
alice=Signal_User("alice")bob=Signal_User("bob")#User Setups
alice.registerUser(signal_server)alice.send_PreKeyToSignalServer(signal_server)alice.send_OneTimeKeysToSignalServer(signal_server)bob.registerUser(signal_server)bob.send_PreKeyToSignalServer(signal_server)bob.send_OneTimeKeysToSignalServer(signal_server)#Setup Done lets send a message
#Assume that Bob is offline Alice can only exchange messages with the server
# DH1 = Alice Idenity Key, Bob's Signed Pre_key
#Alice Gets the
dh1_a=alice.generateDH1_sender(signal_server,"bob")#DH2 = Alice's Ephemeral Key, Bob's Idenity Key
#Alice Generates a new Key for this exchange on the fly
dh2_a=alice.generateDH2_sender(signal_server,"bob")#DH3 = Alice's Ephemeral Key, Bob's Signed Pre_key
dh3_a=alice.generateDH3_sender(signal_server,"bob")#DH4 = Alice's Ephemeral Key, Bob's One Time Key
dh4_a=alice.generateDH4_sender(signal_server,"bob")#Concat Secrets and feed through a KDF
# Salt is either 57*b"\xFF" for X448 or 32*b"\xFF" for X25519
secret_key=alice.generateSecretKey(dh1_a,dh2_a,dh3_a,dh4_a)#Alice Deletes her Ephemeral Private Key and DH ouputs
encrypted_message=alice.AADAndEncryptMessage(secret_key,b"Test Message 12345",signal_server,"bob")####### Time for Bob to get message
print(f"Message Recived: {bob.receiveMessage(encrypted_message)}")